package com.xxl.job.core.thread;

import com.fish.common.core.util.ReturnT;
import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.util.ThrowableUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * handler thread
 *
 * @author xuxueli
 * @since 2016-1-16 19:52:47
 */
public class JobThread extends Thread {

	private static final Logger logger = LoggerFactory.getLogger(JobThread.class);

	private final int jobId;

	private final IJobHandler handler;

	private final LinkedBlockingQueue<TriggerParam> triggerQueue;

	/**
	 * avoid repeat trigger for the same TRIGGER_LOG_ID
	 */
	private final Set<Long> triggerLogIdSet;

	private volatile boolean toStop = false;

	private String stopReason;

	/**
	 * if running job
	 */
	private boolean running = false;

	/**
	 * idel times
	 */
	private int idleTimes = 0;

	public JobThread(int jobId, IJobHandler handler) {
		this.jobId = jobId;
		this.handler = handler;
		this.triggerQueue = new LinkedBlockingQueue<>();
		this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<>());

		// assign job thread name
		this.setName("xxl-job, JobThread-" + jobId + "-" + System.currentTimeMillis());
	}

	public IJobHandler getHandler() {
		return handler;
	}

	/**
	 * new trigger to queue
	 * @param triggerParam
	 * @return
	 */
	public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
		// avoid repeat
		if (triggerLogIdSet.contains(triggerParam.getLogId())) {
			logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
			return ReturnT.instance(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
		}

		triggerLogIdSet.add(triggerParam.getLogId());
		triggerQueue.add(triggerParam);
		return ReturnT.SUCCESS;
	}

	/**
	 * kill job thread
	 * @param stopReason
	 */
	public void toStop(String stopReason) {
		/**
		 * Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep)，
		 * 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身； 所以需要注意，此处彻底销毁本线程，需要通过共享变量方式；
		 */
		this.toStop = true;
		this.stopReason = stopReason;
	}

	/**
	 * is running job
	 * @return
	 */
	public boolean isRunningOrHasQueue() {
		return running || !triggerQueue.isEmpty();
	}

	@Override
	public void run() {

		// init
		try {
			handler.init();
		}
		catch (Throwable e) {
			logger.error(e.getMessage(), e);
		}

		// execute
		while (!toStop) {
			running = false;
			idleTimes++;

			TriggerParam triggerParam = null;
			try {
				// to check toStop signal, we need cycle, so wo cannot use queue.take(),
				// instand of poll(timeout)
				triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
				if (triggerParam != null) {
					running = true;
					idleTimes = 0;
					triggerLogIdSet.remove(triggerParam.getLogId());

					// log filename, like "logPath/yyyy-MM-dd/9999.log"
					String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()),
							triggerParam.getLogId());
					XxlJobContext xxlJobContext = new XxlJobContext(triggerParam.getJobId(),
							triggerParam.getExecutorParams(), logFileName, triggerParam.getBroadcastIndex(),
							triggerParam.getBroadcastTotal());

					// init job context
					XxlJobContext.setXxlJobContext(xxlJobContext);

					// execute
					XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:"
							+ xxlJobContext.getJobParam());

					if (triggerParam.getExecutorTimeout() > 0) {
						// limit timeout
						Thread futureThread = null;
						try {
							FutureTask<Boolean> futureTask = new FutureTask<>(() -> {

								// init job context
								XxlJobContext.setXxlJobContext(xxlJobContext);

								handler.execute();
								return true;
							});
							futureThread = new Thread(futureTask);
							futureThread.start();

							Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
						}
						catch (TimeoutException e) {

							XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
							XxlJobHelper.log(e);

							// handle result
							XxlJobHelper.handleTimeout("job execute timeout ");
						}
						finally {
							assert futureThread != null;
							futureThread.interrupt();
						}
					}
					else {
						// just execute
						handler.execute();
					}

					// valid execute handle data
					if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
						XxlJobHelper.handleFail("job handle result lost.");
					}
					else {
						String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();
						tempHandleMsg = (tempHandleMsg != null && tempHandleMsg.length() > 50000)
								? tempHandleMsg.substring(0, 50000).concat("...") : tempHandleMsg;
						XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
					}
					XxlJobHelper.log(
							"<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
									+ XxlJobContext.getXxlJobContext().getHandleCode() + ", handleMsg = "
									+ XxlJobContext.getXxlJobContext().getHandleMsg());

				}
				else {
					if (idleTimes > 30) {
						// avoid concurrent trigger causes jobId-lost
						if (triggerQueue.isEmpty()) {
							XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
						}
					}
				}
			}
			catch (Throwable e) {
				if (toStop) {
					XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
				}

				// handle result
				String errorMsg = ThrowableUtil.toString(e);

				XxlJobHelper.handleFail(errorMsg);

				XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg
						+ "<br>----------- xxl-job job execute end(error) -----------");
			}
			finally {
				if (triggerParam != null) {
					// callback handler info
					if (!toStop) {
						// commonm
						TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),
								triggerParam.getLogDateTime(), XxlJobContext.getXxlJobContext().getHandleCode(),
								XxlJobContext.getXxlJobContext().getHandleMsg()));
					}
					else {
						// is killed
						TriggerCallbackThread.pushCallBack(
								new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(),
										XxlJobContext.HANDLE_CODE_FAIL, stopReason + " [job running, killed]"));
					}
				}
			}
		}

		// callback trigger request in queue
		while (triggerQueue != null && !triggerQueue.isEmpty()) {
			TriggerParam triggerParam = triggerQueue.poll();
			if (triggerParam != null) {
				// is killed
				TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),
						triggerParam.getLogDateTime(), XxlJobContext.HANDLE_CODE_FAIL,
						stopReason + " [job not executed, in the job queue, killed.]"));
			}
		}

		// destroy
		try {
			handler.destroy();
		}
		catch (Throwable e) {
			logger.error(e.getMessage(), e);
		}

		logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
	}

}
